Use Case 4 – Adaptation to societal risk of geohazards through schools and hospitals in Greece#
Introduction#
This Use Case assesses the societal risk of schools and hospitals in Central Macedonia (Greece), considering earthquake and earthquake-triggered landslide hazards. The analysis combines hazard, exposure, and vulnerability information to estimate potential physical damage, human losses, and economic impacts for different return periods.
The results support climate adaptation planning and risk-informed decision-making for critical infrastructures such as educational and healthcare facilities. Relevant stakeholders include: The Civil Protection authorities of the Region of Central Macedonia and the Municipality of Thessaloniki, Greece.
Directory structure#
This notebook assumes the following directory structure:
data/: Input datasets (exposure, hazard, fragility/vulnerability tables, administrative boundaries).Results/: Output folder automatically created by the notebook. They are separated betweenSingle-Hazard(earthquakes) andMulti-Hazard(earthquakes and landslides)
[!NOTE]
<asset>is eitherSchoolsorHospitalsdepending on the selection below.
Select whether you want to compute the risk assessment for educational buildings (asset = “Schools”) or healthcare facilities (asset = “Hospitals”).
asset = "Schools"
Define the folder where the data is available and where the results will be saved.
data_path = Path(os.getcwd()).parent / 'data'
savepath = Path(os.getcwd()).parent / 'Results' / asset
The following color variables are defined for the Miraca theme. These colors will be used for visualizations. The font settings for the plots are also adjusted for consistency.
# miraca colors
c1 = '#4069F6' # primary blue 500
c2 = '#171E37' # black
c3 = '#64F4C0' # accent green
c4 = '#FFFFFF' # white
c5 = '#ED5861' # red
c6 = '#F8CD48' # yellow
c7 = '#72DA95' # green 500
c8 = '#373D52' # grey 900
c9 = '#8F94A3' # grey 500
c10 = '#EBEDF5' # grey 100
c11 = '#72DA95' # green
c12 = '#373D52' # blue 900
c13 = '#6687F8' # blue 400
c14 = '#EBEDF5' # blue 100
c15 = '#373D52' # 429787 900
c16 = '#9CF8D7' # green 400
c17 = '#E0FDF2' # green 100
# Adjust font settings
mpl.rc('font', family='Calibri')
font = {'family': 'Calibri', 'weight': 'bold'}
rcParams['mathtext.default'] = 'regular'
rcParams['mathtext.rm'] = font['family']
Hazard characterisation#
Earthquakes
Seismic hazard information is derived from the European Seismic Hazard Model 2020 (ESHM20), which provides harmonised probabilistic earthquake hazard estimates for the Euro-Mediterranean region. Peak Ground Acceleration (PGA) and spectral accelerations Sa(T) at periods T = 0.3 s, 0.6 s, 1.0 s and 1.5 s are considered for multiple return periods (73, 102, 475, 1000, 2500 and 5000 years). Hazard values are extracted for the Region of Central Macedonia, Greece.
Ground-surface intensity measures are estimated using the OpenQuake Engine through probabilistic seismic hazard analysis (PSHA), accounting for local site effects. Site amplification is incorporated using the ESRM20 Exposure-to-Site model.
Single hazard / Multi-hazards considered:
Single hazard: earthquake ground shaking (PGA and spectral accelerations).
Multi-hazard: earthquake-triggered landslides, combined with seismic hazard in subsequent sections of the analysis.
""" This dictionary initializes a DataFrame containing seismic return period data.
The 'No' column represents a sequential identifier for each seismic scenario.
The 'Return_Period' column specifies the return period in years,
and the '1/years' column provides the corresponding annual probability of occurrence (i.e., the inverse of the return period).
The resulting DataFrame, Scenaria, captures these seismic scenario details. """
data = {'No': [0,1,2,3,4,5],
'Return_Period': [73,102,475,1000,2500,5000],
'1/years': [0.0137,0.0098,0.0021,0.001,0.0003,0.0001]
}
Scenaria = pd.DataFrame(data)
Load & visualise hazard data#
# Load hazard data
csv2_path = data_path / "Hazard_Central_Macedonia.csv"
# Load the CSV file into pandas DataFrames
df_hazard = pd.read_csv(csv2_path)
# Column names for longitude and latitude
longitude_col2 = 'lon'
latitude_col2 = 'lat'
# Convert the DataFrame to GeoDataFrames
gdf_hazard = gpd.GeoDataFrame(df_hazard, geometry=gpd.points_from_xy(df_hazard[longitude_col2], df_hazard[latitude_col2]))
# Set the CRS
gdf_hazard.set_crs(epsg=4326, inplace=True)
gdf_hazard.explore(
column="PGA-0.001",
cmap="OrRd",
legend=True,
legend_kwds={
'caption': 'PGA (T=1000 years)',
'orientation': 'horizontal' # optional: 'horizontal' or 'vertical'
},
marker_kwds={
'radius': 4,
'fill': True
}
)
Exposure Assessment#
Exposure data correspond to educational and healthcare facilities located in the Region of Central Macedonia, Greece.
Study area: Region of Central Macedonia, Greece.
Assets considered: Schools and major healthcare facilities (hospitals).
Educational facilities achieve high spatial coverage across the study area.
Healthcare facilities focus on the main hospitals located within the Metropolitan area of Thessaloniki.
Assets are selected based on the availability of reliable exposure information to ensure robust risk assessment results.
Load & visualise exposure data#
# Load exposure data
if asset == "Schools":
csv1_path = data_path / "Exposure_Riskschools.csv"
elif asset == "Hospitals":
csv1_path = data_path / "Exposure_Hospitals.csv"
else:
print("Invalid asset type. Please choose either 'Schools' or 'Hospitals'.")
sys.exit(1)
# Load the CSV file into pandas DataFrames
df_exposure= pd.read_csv(csv1_path)
# Column names for longitude and latitude
longitude_col1 = 'Longitude'
latitude_col1 = 'Latitude'
# Convert the DataFrame to GeoDataFrames
gdf_exposure = gpd.GeoDataFrame(df_exposure, geometry=gpd.points_from_xy(df_exposure[longitude_col1], df_exposure[latitude_col1]))
# Set the CRS
gdf_exposure.set_crs(epsg=4326, inplace=True)
gdf_exposure.explore(
marker_kwds={
"radius": 4, # Scale marker size (adjust as needed)
"fill": True
},
color=c1, # Color map: Orange-Red (you can change it)
legend=True # Show legend
)
The following csv file contains the spatially joined data of exposure and hazard points, with the nearest hazard point coordinates added for each exposure point.
# Perform the spatial join using the nearest neighbor method
gdf_exposure_w_hazard = gpd.sjoin_nearest(gdf_exposure, gdf_hazard, how="left", distance_col="distance")
# Add nearest x and y coordinates from gdf2
gdf_exposure_w_hazard['nearest_x'] = gdf_exposure_w_hazard[longitude_col2] # Nearest longitude from gdf2
gdf_exposure_w_hazard['nearest_y'] = gdf_exposure_w_hazard[latitude_col2] # Nearest latitude from gdf2
# Ensure the directory exists before saving
savepath.mkdir(parents=True, exist_ok=True)
# Save the joined GeoDataFrame to CSV
gdf_exposure_w_hazard.to_csv(savepath / f"Joined_nearest_hazard_{asset}.csv", index=False)
gdf_exposure_w_hazard.explore(
column="PGA-0.001",
cmap="OrRd",
legend=True,
legend_kwds={
'caption': 'PGA (T=1000 years)',
'orientation': 'horizontal' # optional: 'horizontal' or 'vertical'
},
marker_kwds={
'radius': 4,
'fill': True
},
tooltip="PGA-0.001",
)
Landslide hazard#
Landslide hazard is represented using the European Landslide Susceptibility Map (ELSUS v2) for the Region of Central Macedonia, Greece.
Landslide susceptibility information is derived from the ELSUS v2 dataset.
Raster susceptibility classes are preprocessed and converted into polygon format.
A spatial nearest-neighbour join is performed to associate each exposure point with the corresponding landslide susceptibility class.
The susceptibility index is later used for the estimation of earthquake-triggered landslide impacts within the multi-hazard assessment.
# Load the CSV
csv3_path = data_path / f"Joined_nearest_susceptibility_{asset}.csv"
susc = pd.read_csv(csv3_path)
# Convert WKT string to geometry (assuming the column is named 'geometry')
susc['geometry'] = susc['geometry'].apply(wkt.loads)
# Convert to GeoDataFrame
susc_gdf = gpd.GeoDataFrame(susc, geometry='geometry')
# Set the correct CRS if known (replace EPSG:4326 if different)
susc_gdf.set_crs("EPSG:4326", inplace=True).head()
| Longitude | Latitude | id | Name | Counsil | Material | Lateral Loading system | Code level | Floors | Lateral Coefficiency | Intensity | Typology | Area | geometry | index_right | raster_value | distance_to_raster | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 22.538980 | 40.511320 | 72 | 3ο Νηπιαγωγείο Αιγινίου | ΠΥΔΝΑΣ-ΚΟΛΙΝΔΡΟΥ | Precast | LFINF: infilled frame | CDH | 1 | NaN | SA(1.5) | Precast-CDH | 107.60 | POINT (22.53898 40.51132) | 4044155 | 1.0 | 0.0 |
| 1 | 22.543110 | 40.498490 | 73 | 1ο Δημοτικό Αιγινίου | ΠΥΔΝΑΣ-ΚΟΛΙΝΔΡΟΥ | CR: reinforced concrete | LFINF: infilled frame | CDH | 1 | 10.0 | PGA | CR_LFINF-CDH-10_H1 | 1008.00 | POINT (22.54311 40.49849) | 4041080 | 2.0 | 0.0 |
| 2 | 22.911365 | 40.734834 | 82 | 1ο Νηπιαγωγείο Ωραιοκάστρου | ΩΡΑΙΟΚΑΣΤΡΟΥ | MUR: unreinforced masonry | LWAL: load bearing wall | CDN | 1 | NaN | PGA | MUR_LWAL-DNO_H1 | 450.00 | POINT (22.91136 40.73483) | 3972329 | 3.0 | 0.0 |
| 3 | 22.919226 | 40.723864 | 83 | 2ο Γυμνάσιο Ωραιοκάστρου | ΩΡΑΙΟΚΑΣΤΡΟΥ | CR: reinforced concrete | LFINF: infilled frame | CDH | 3 | 15.0 | SA(0.6) | CR_LFINF-CDH-15_H3 | 3408.00 | POINT (22.91923 40.72386) | 3989309 | 2.0 | 0.0 |
| 4 | 22.993852 | 40.612094 | 84 | 3ο Δημοτικό Σχολείο Πυλαίας | ΠΥΛΑΙΑΣ-ΧΟΡΤΙΑΤΗ | CR: reinforced concrete | LDUAL: dual frame-wall system | CDH | 2 | NaN | PGA | CR_LDUAL-DUH_H2 | 2010.00 | POINT (22.99385 40.61209) | 4017328 | 2.0 | 0.0 |
| ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
| 1713 | 22.456130 | 40.222693 | 2024 | Λύκειο Κονταριώτισσας - Κτίριο Β | ΔΙΟΥ-ΟΛΥΜΠΟΥ | CR: reinforced concrete | LDUAL: dual frame-wall system | CDH | 2 | NaN | PGA | CR_LDUAL-DUH_H2 | 1024.80 | POINT (22.45613 40.22269) | 4097147 | 3.0 | 0.0 |
| 1714 | 22.456130 | 40.222693 | 2025 | Λύκειο Κονταριώτισσας - Κτίριο Γ | ΔΙΟΥ-ΟΛΥΜΠΟΥ | CR: reinforced concrete | LDUAL: dual frame-wall system | CDH | 2 | NaN | PGA | CR_LDUAL-DUH_H2 | 1160.90 | POINT (22.45613 40.22269) | 4097147 | 3.0 | 0.0 |
| 1715 | 22.456130 | 40.222693 | 2026 | Λύκειο Κονταριώτισσας - Κτίριο Δ | ΔΙΟΥ-ΟΛΥΜΠΟΥ | CR: reinforced concrete | LDUAL: dual frame-wall system | CDH | 2 | NaN | PGA | CR_LDUAL-DUH_H2 | 225.36 | POINT (22.45613 40.22269) | 4097147 | 3.0 | 0.0 |
| 1716 | 22.456130 | 40.222693 | 2027 | Λύκειο Κονταριώτισσας - Κτίριο Ε | ΔΙΟΥ-ΟΛΥΜΠΟΥ | S: steel | LFBR: braced frame | CDH | 1 | NaN | PGA | S_LFBR-DUH_H1 | 956.37 | POINT (22.45613 40.22269) | 4097147 | 3.0 | 0.0 |
| 1717 | 22.500322 | 40.230284 | 2029 | Νηπιαγωγείο Νέας Εφέσσου | ΔΙΟΥ-ΟΛΥΜΠΟΥ | CR: reinforced concrete | LFINF: infilled frame | CDM | 1 | 5.0 | SA(0.3) | CR_LFINF-CDM-5_H1 | 112.36 | POINT (22.50032 40.23028) | 4110423 | 1.0 | 0.0 |
1718 rows × 17 columns
Visualise susceptibility map#
# Step 1: Ensure raster_value is treated as categorical with specific order
susc_gdf["Susceptibility_index"] = pd.Categorical(
susc_gdf["raster_value"],
categories=[1, 2, 3, 4, 5],
ordered=True
)
# Step 2: Define custom colors for each class (adjust as needed)
colors = ['#ffffcc', '#a1dab4', '#41b6c4', '#2c7fb8', 'red'] # Light yellow to dark blue
# Step 3: Plot with .explore using the categorical column and custom colors
susc_gdf.explore(
column="Susceptibility_index",
cmap=ListedColormap(colors),
legend=True,
legend_kwds={
'caption': 'Susceptibility Index',
'orientation': 'horizontal'
},
marker_kwds={
'radius': 4,
'fill': True,
},
text="Susceptibility_index"
)
Combined exposure and hazard dataset#
Exposure points are linked with the corresponding earthquake intensity measures and landslide susceptibility information.
The resulting dataset combines exposure, seismic hazard, and landslide susceptibility attributes and is used as input for the subsequent vulnerability and risk assessment steps.
# Load Exposure and Hazard Data
df = pd.read_csv(savepath / f"Joined_nearest_hazard_{asset}.csv")
# Load the CSV file with the new column
susc = pd.read_csv(data_path / f"Joined_nearest_susceptibility_{asset}.csv")
df['SAMPLE_1'] = susc['raster_value']
Vulnerability Assessment#
Seismic vulnerability of educational and healthcare facilities is assessed using building-class specific fragility and vulnerability models.
Vulnerability and fragility functions are primarily derived from the ESRM20 methodology developed within the European seismic risk framework.
Additional literature-based vulnerability models are adopted for reinforced concrete precast buildings (e.g., Yesilyurt et al., 2021), where required.
Vulnerability functions relate ground-motion intensity measures (PGA and spectral accelerations) to damage state probabilities.
Damage states are subsequently converted into damage index (loss ratio) values used in the risk assessment.
Extract values for fragility based on Typology.#
# Load ESRM20 data
sheet_name = "Final"
Fragility = pd.read_excel(data_path / "fragility_various_IM_lognormal.xlsx", sheet_name=sheet_name)
Fragility.columns = ['Typology', 'IMT', 'Median_DS1', 'Median_DS2', 'Median_DS3', 'Median_DS4', 'Beta_DS1', "Beta_DS2", "Beta_DS3","Beta_DS4"]
# Initialize lists to store results
IM1 = []
IM2 = []
IM3 = []
IM4 = []
Beta_DS1 = []
Beta_DS2 = []
Beta_DS3 = []
Beta_DS4 = []
# Iterate over each value in 'Typology' column of data
for taxonomy in df['Typology']:
# Check if any row in 'Typology' column matches current taxonomy
if (Fragility['Typology'] == taxonomy).any():
# Find the row where 'Typology' matches and retrieve values
filtered_data = Fragility.loc[Fragility['Typology'] == taxonomy]
if not filtered_data.empty:
IM1.append(filtered_data['Median_DS1'].values[0])
IM2.append(filtered_data['Median_DS2'].values[0])
IM3.append(filtered_data['Median_DS3'].values[0])
IM4.append(filtered_data['Median_DS4'].values[0])
Beta_DS1.append(filtered_data['Beta_DS1'].values[0])
Beta_DS2.append(filtered_data['Beta_DS2'].values[0])
Beta_DS3.append(filtered_data['Beta_DS3'].values[0])
Beta_DS4.append(filtered_data['Beta_DS4'].values[0])
else:
# Handle case where no matching Typology is found in ESRM20
IM1.append(None)
IM2.append(None)
IM3.append(None)
IM4.append(None)
Beta_DS1.append(None)
Beta_DS2.append(None)
Beta_DS3.append(None)
Beta_DS4.append(None)
else:
# Handle case where no matching Typology is found in ESRM20
IM1.append(None)
IM2.append(None)
IM3.append(None)
IM4.append(None)
Beta_DS1.append(None)
Beta_DS2.append(None)
Beta_DS3.append(None)
Beta_DS4.append(None)
# Create a new DataFrame to store results
IM = pd.DataFrame({
'IM1': IM1,
'IM2': IM2,
'IM3': IM3,
'IM4': IM4,
'Beta_DS1': Beta_DS1,
'Beta_DS2': Beta_DS2,
'Beta_DS3': Beta_DS3 ,
'Beta_DS4': Beta_DS4
})
IM.head()
| IM1 | IM2 | IM3 | IM4 | Beta_DS1 | Beta_DS2 | Beta_DS3 | Beta_DS4 | |
|---|---|---|---|---|---|---|---|---|
| 0 | 0.475000 | 0.833000 | 1.355000 | 1.866000 | 0.285000 | 0.283 | 0.283 | 0.263 |
| 1 | 1.345492 | 3.218407 | 5.030098 | 6.708921 | 0.493866 | NaN | NaN | NaN |
| 2 | 0.314759 | 0.586519 | 0.783376 | 0.939881 | 0.456357 | NaN | NaN | NaN |
| 3 | 0.658211 | 1.429666 | 2.174719 | 2.862426 | 0.464744 | NaN | NaN | NaN |
| 4 | 0.729419 | 1.405535 | 1.856772 | 2.204783 | 0.393803 | NaN | NaN | NaN |
Risk Assessment#
Risk metrics are computed by combining hazard intensity measures with exposure and vulnerability information for each seismic scenario.
Asset-level damage probabilities and loss ratios are estimated for multiple return periods, followed by the computation of annual collapse probability using the stress-based methodology.
Calculate damage index#
def calculate_log_normal_distribution(beta, scale):
"""
Creates a log-normal distribution using the given beta and scale parameters.
Parameters:
- beta: Beta parameter (shape) for the log-normal distribution.
- scale: Scale parameter for the log-normal distribution (IM value).
Returns:
- dist: A log-normal distribution object.
"""
return lognorm(s=beta, scale=scale)
def get_beta_values(typology, im_row):
"""
Determines beta values based on typology. Adjusts beta values if the typology contains 'Precast'.
Parameters:
- typology: The building typology.
- im_row: The current row from the IM DataFrame.
Returns:
- Tuple of beta values for DS1, DS2, DS3, and DS4.
"""
if 'Precast' in typology:
beta_dist1 = im_row['Beta_DS1'] if pd.notna(im_row['Beta_DS1']) else im_row['Beta_DS1']
beta_dist2 = im_row['Beta_DS2'] if pd.notna(im_row['Beta_DS2']) else im_row['Beta_DS1']
beta_dist3 = im_row['Beta_DS3'] if pd.notna(im_row['Beta_DS3']) else im_row['Beta_DS1']
beta_dist4 = im_row['Beta_DS4'] if pd.notna(im_row['Beta_DS4']) else im_row['Beta_DS1']
else:
beta_dist1 = im_row['Beta_DS1']
beta_dist2 = im_row['Beta_DS1']
beta_dist3 = im_row['Beta_DS1']
beta_dist4 = im_row['Beta_DS1']
return beta_dist1, beta_dist2, beta_dist3, beta_dist4
def compute_cdf_values(row, intensity_column_map, im_row):
"""
Computes CDF values based on the intensity and IM row data.
Parameters:
- row: A row from the input DataFrame.
- intensity_column_map: A map that links intensity types to column names.
- im_row: A row from the IM DataFrame containing beta and IM values.
Returns:
- A tuple of computed CDF values (cdf1, cdf2, cdf3, cdf4).
"""
# Get the intensity type and column name for current row
intensity_type = row["Intensity"]
column_name = intensity_column_map.get(intensity_type)
x_value = row[column_name] if column_name else np.nan
# Calculate log-normal distributions
beta_dist1, beta_dist2, beta_dist3, beta_dist4 = get_beta_values(row['Typology'], im_row)
dist1 = calculate_log_normal_distribution(beta_dist1, im_row["IM1"])
dist2 = calculate_log_normal_distribution(beta_dist2, im_row["IM2"])
dist3 = calculate_log_normal_distribution(beta_dist3, im_row["IM3"])
dist4 = calculate_log_normal_distribution(beta_dist4, im_row["IM4"])
# Compute CDF values
return dist1.cdf(x_value), dist2.cdf(x_value), dist3.cdf(x_value), dist4.cdf(x_value)
Risk Assessment: Single-hazard risk assessment (earthquake)#
# Create a dictionary to store DataFrames from each scenario by their years_value
cdf_dict = {}
# Iterate over each seismic scenario
for index_scen, scen_row in Scenaria.iterrows():
# Variables to store CDF results and intensity values
cdf1, cdf2, cdf3, cdf4 = [], [], [], []
# Scenario parameters
years_value = scen_row['1/years']
scenario_no = int(scen_row['No'])
return_period = int(scen_row['Return_Period'])
# Generate intensity column map for the current scenario
intensity_column_map = {
"PGA": f"PGA-{years_value}",
"SA(0.3)": f"SA(0.3)-{years_value}",
"SA(0.6)": f"SA(0.6)-{years_value}",
"SA(1.0)": f"SA(1.0)-{years_value}",
"SA(1.5)": f"SA(1.5)-{years_value}",
}
# Iterate over each row in the DataFrame df
for index, row in df.iterrows():
im_row = IM.iloc[index] # Corresponding IM DataFrame row
# Compute CDF values
cdf1_val, cdf2_val, cdf3_val, cdf4_val = compute_cdf_values(row, intensity_column_map, im_row)
cdf1.append(cdf1_val)
cdf2.append(cdf2_val)
cdf3.append(cdf3_val)
cdf4.append(cdf4_val)
# Create DataFrame to store CDF values for the current scenario
CDF = pd.DataFrame({
f"CDF1-{years_value}": cdf1,
f"CDF2-{years_value}": cdf2,
f"CDF3-{years_value}": cdf3,
f"CDF4-{years_value}": cdf4
})
# Calculate probabilities of occurrence for each damage state
CDF[f"P_DS1-{years_value}"] = CDF[f"CDF1-{years_value}"] - CDF[f"CDF2-{years_value}"]
CDF[f"P_DS2-{years_value}"] = CDF[f"CDF2-{years_value}"] - CDF[f"CDF3-{years_value}"]
CDF[f"P_DS3-{years_value}"] = CDF[f"CDF3-{years_value}"] - CDF[f"CDF4-{years_value}"]
CDF[f"P_DS4-{years_value}"] = CDF[f"CDF4-{years_value}"]
# Initialize the list for Damage Index (DI) values
DI_values = []
# Loop over the rows again to calculate the DI for each row
for index, row in df.iterrows():
# Check Material type and calculate DI accordingly
if row['Material'] == 'Precast':
DI_value = (
CDF.loc[index, f"P_DS1-{years_value}"] * 0.05 +
CDF.loc[index, f"P_DS2-{years_value}"] * 0.30 +
CDF.loc[index, f"P_DS3-{years_value}"] * 0.70 +
CDF.loc[index, f"P_DS4-{years_value}"] * 1
)
else:
DI_value = (
CDF.loc[index, f"P_DS1-{years_value}"] * 0.05 +
CDF.loc[index, f"P_DS2-{years_value}"] * 0.15 +
CDF.loc[index, f"P_DS3-{years_value}"] * 0.60 +
CDF.loc[index, f"P_DS4-{years_value}"] * 1
)
# Append the calculated DI to the list
DI_values.append(DI_value)
# Add the calculated DI values to the DataFrame
CDF[f"DI-{years_value}"] = DI_values
# Reset the index to align all data
CDF.reset_index(drop=True, inplace=True)
# Store the DataFrame for the current scenario in a dictionary
cdf_dict[years_value] = CDF
# Concatenate all the DataFrames horizontally (axis=1)
DI_seismic = pd.concat(cdf_dict.values(), axis=1)
# Drop duplicate 'Longitude' and 'Latitude' columns, keeping only the first occurrence
DI_seismic = DI_seismic.loc[:, ~DI_seismic.columns.duplicated()]
Calculate annual collapse probability based on strest methodology
# Define return periods
Periods = [5,73,102,475,1000,2500,5000,10000]
lamda1=[0.9999,0.5,0.39,0.1,0.05,0.02,0.01,0.005]
# Compute λ values
lambda_values = [1 / (-50 / np.log(1 - l)) for l in lamda1]
# Compute P[IM_i] using the given formula
P_IM = []
for t in range(1, len(lambda_values) - 1): # Avoid first and last index
P_IM_i = (lambda_values[t - 1] - lambda_values[t + 1]) / 2
P_IM.append(P_IM_i)
# Create a dictionary mapping return periods to P[IM_i]
P_IM_dict = {data['1/years'][i]: P_IM[i] for i in range(len(P_IM))}
# Multiply each "CDF4-{r}" column by the corresponding P[IM_i]
for r in data['1/years']: # Exclude first and last periods as they lack P[IM_i]
column_name = f"CDF4-{r}"
new_column_name = f"{column_name}*P[IM_i]"
if column_name in DI_seismic.columns:
DI_seismic[new_column_name] = DI_seismic[column_name] * P_IM_dict[r]
# Add a new column that sums all the new columns
new_columns = [f"CDF4-{r}*P[IM_i]" for r in data['1/years'] if f"CDF4-{r}" in DI_seismic.columns]
# Create a new column "Sum_of_new_columns" by summing across the new columns
DI_seismic["Annual Collapse Probability"] = DI_seismic[new_columns].sum(axis=1)
Create and save the final DataFrame containing the damage index (loss ratio) for different return periods along with the annual probability of collapse for each exposure point.
# Concatenate df and IM horizontally, aligning by their index
common_df = pd.concat([df, IM, DI_seismic], axis=1)
# Define the path first
seismic_file_path = savepath / f"Single-Hazard/Earthquakes/DI_Earthquakes_ESRM20_{asset}.csv"
# Ensure the directory exists
seismic_file_path.parent.mkdir(parents=True, exist_ok=True)
# Save the DataFrame to CSV
common_df.to_csv(seismic_file_path, index=False)
Visualise loss ratio map for a return period of 1000 years
common_df['geometry'] = common_df.apply(lambda row: Point(row['Longitude'], row['Latitude']), axis=1)
# Convert to GeoDataFrame
seismic_gdf = gpd.GeoDataFrame(common_df, geometry='geometry', crs="EPSG:4326")
# Create bins for the DI-0.001 column
bins = [0, 0.1, 0.25, 0.4, 1] # Define the bin ranges
labels = ['0-0.1', '0.1-0.25', '0.25-0.4', '0.4-1'] # Labels for the bins
# Assign bin values based on 'DI-0.001'
seismic_gdf['DI_binned'] = pd.cut(seismic_gdf['DI-0.001'], bins=bins, labels=labels, right=False)
# Interactive map with 4 bins and custom colors
seismic_gdf.explore(
column="DI_binned", # Use the binned DI-0.001 column for coloring
cmap='OrRd', # Optional colormap, we are using custom colors
stroke=True, # Add stroke (outline) around markers
line_color="black", # Set the stroke color to black
legend=True, # Show the legend
legend_kwds={
'caption': 'Loss ratio (T=1000 years)', # Legend caption
'orientation': 'horizontal' # Horizontal legend
},
marker_kwds={
'radius': 5, # Set marker size (adjust as needed)
'fill': True, # Fill the markers with color
},
tooltip="DI-0.001", # Show the DI-0.001 value in the tooltip
)
Compute human and economic losses based on ESRM20 methodology
# Load your reference data (Excel with probabilities)
reference = pd.read_excel(data_path / "fatality_damage_model_ESRM20.xlsx")
# Create empty lists to store the values
P_lethal_DS4 = []
Collapse_factor = []
P_entrap_day = []
P_entrap_night = []
P_loss_life = []
# For each row in common_df, look up values based on Typology
for taxonomy in common_df['Typology']:
match = reference[reference['Typology'] == taxonomy]
if not match.empty:
P_lethal_DS4.append(match['P_lethal-building | DS4'].values[0])
Collapse_factor.append(match['Collapse_factor'].values[0])
P_entrap_day.append(match['P_entrapment_day'].values[0])
P_entrap_night.append(match['P_entrapment_night'].values[0])
P_loss_life.append(match['P_loss-life | entrapment'].values[0])
else:
# If not found, append None or a default
P_lethal_DS4.append(None)
Collapse_factor.append(None)
P_entrap_day.append(None)
P_entrap_night.append(None)
P_loss_life.append(None)
# Add the new columns to your DataFrame
common_df['P_lethal-building | DS4'] = P_lethal_DS4
common_df['Collapse_factor'] = Collapse_factor
common_df['P_entrapment_day'] = P_entrap_day
common_df['P_entrapment_night'] = P_entrap_night
common_df['P_loss-life | entrapment'] = P_loss_life
# Iterate through each return period in the Scenaria DataFrame
for index, row in Scenaria.iterrows():
return_period = row['1/years'] # Get the '1/years' value
# Create the new column name dynamically, e.g., "new_column_0.0137" for the first return period
new_column_name = f"Fatality_damage_{return_period}"
# Calculate the new column for each row in common_df
common_df[new_column_name] = (
common_df[f"P_DS4-{return_period}"] * common_df['P_lethal-building | DS4'] *
common_df['Collapse_factor'] * common_df['P_loss-life | entrapment'] * common_df['Area'] / 3
)
common_df[new_column_name] = common_df[new_column_name].round(0)
# Iterate through each return period in the Scenaria DataFrame
for index, row in Scenaria.iterrows():
return_period = row['1/years'] # Get the '1/years' value
# Create the new column name dynamically, e.g., "new_column_0.0137" for the first return period
new_column_name = f"Economic_Losses_{return_period}"
# Calculate the new column for each row in common_df
common_df[new_column_name] = (common_df['Area'] * 2000 * common_df[f"DI-{return_period}"])
# Save the updated common_df with the new columns
common_df.to_csv(seismic_file_path, index=False)
Visualise economic and human losses for a return period of 1000 years aggragated per municipality
common_df['geometry'] = common_df.apply(lambda row: Point(row['Longitude'], row['Latitude']), axis=1)
# Convert to GeoDataFrame
seismic_gdf = gpd.GeoDataFrame(common_df, geometry='geometry', crs="EPSG:4326")
municipalities_gdf = gpd.read_file(data_path / "oria_dimon.shp")
# Ensure both datasets are in the same CRS
if seismic_gdf.crs != municipalities_gdf.crs:
seismic_gdf = seismic_gdf.to_crs(municipalities_gdf.crs)
# Remove conflicting columns if they exist
for col in ['index_left', 'index_right']:
if col in seismic_gdf.columns:
seismic_gdf = seismic_gdf.drop(columns=col)
if col in municipalities_gdf.columns:
municipalities_gdf = municipalities_gdf.drop(columns=col)
# Step 2: Perform spatial join — assign each point in seismic_gdf to a municipality
joined = gpd.sjoin(seismic_gdf, municipalities_gdf, how="left", predicate="within")
# Step 3: Aggregate — Sum Economic Losses and Fatality (Human) Losses per municipality
agg = joined.groupby('index_right').agg({
'Economic_Losses_0.001': 'sum', # Sum the Economic Losses for each municipality
'Fatality_damage_0.001': 'sum' # Sum Human Losses for each municipality
}).rename(columns={
'Economic_Losses_0.001': 'Economic_Losses',
'Fatality_damage_0.001': 'Human Losses'
})
# Step 4: Join the aggregated data back to the municipalities
municipalities_gdf = municipalities_gdf.join(agg)
# Step 5: Apply binning for **Economic Losses**
economic_bins = [0, 400000, 3430000, 6000000, 8098000, 124540000] # Define custom bins
economic_labels = ['0-400K', '400K-3.4M', '3.4M-6M', '6M-8M', '8M+'] # Labels for the bins
# Create a new column with binned values for Economic Losses
municipalities_gdf['Economic_Losses_binned'] = pd.cut(municipalities_gdf['Economic_Losses'], bins=economic_bins, labels=economic_labels, right=False)
# Step 6: Apply binning for **Human Losses** with explicit inclusion of zero in the bins
human_bins = [0, 1, 2, 4, 7, np.inf] # Bins for Human Losses (0-1, 1-2, etc.)
human_labels = ['0-1', '1-2', '2-4', '4-7', '7+'] # Labels for the bins
# Create a new column with binned values for Human Losses (ensure '0' is included in the first bin)
municipalities_gdf['Fatality_binned'] = pd.cut(municipalities_gdf['Human Losses'], bins=human_bins, labels=human_labels, right=False)
# Plot Economic Losses
municipalities_gdf.explore(
column='Economic_Losses_binned',
cmap='Spectral_r',
legend=True,
legend_kwds={
'caption': 'Economic Losses € (T=1000 years)',
'orientation': 'horizontal'
},
tooltip='Economic_Losses' # Display the binned categories in the polygons
)
# Plot Human Losses
municipalities_gdf.explore(
column='Fatality_binned',
cmap='Spectral_r',
legend=True,
legend_kwds={
'caption': 'Human Losses (T=1000 years)',
'orientation': 'horizontal'
},
tooltip='Human Losses' # Display the binned categories in the polygons
)
Risk Assessment: Multi-hazard risk assessment (earthquake-triggered landslides)#
This section extends the single-hazard seismic risk assessment by considering the effects of earthquake-triggered landslides.
Ground-motion intensity measures (PGA) derived from ESHM20 and estimated at ground surface using OpenQuake are used as triggering parameters for landslide occurrence.
Landslide susceptibility is characterised using the European Landslide Susceptibility Map (ELSUSv2), which is linked to yield acceleration ratio (ky) values based on FEMA (2022) recommendations and expert judgement.
Permanent Ground Displacement (PGD) caused by earthquake-induced landslides is estimated using analytical models relating PGD with PGA and yield acceleration (e.g., Fotopoulou and Pitilakis, 2015; Rathje and Antonakos, 2011).
Landslide-induced damage is evaluated using appropriate fragility relationships and combined with seismic damage probabilities.
Compound damage probabilities are computed by combining ground-shaking and landslide damage state exceedance probabilities, assuming conditional landslide occurrence given the earthquake event.
Final loss ratios are estimated considering both seismic intensity (PGA) and ground failure effects (PGD).
Results are provided in terms of asset-level loss estimates and spatial loss maps for multiple return periods.
Calculate yield acceleration ratio (ky)
# Define a function to apply the logic
def calculate_value(sample):
if sample == 1:
return 0.4
elif sample == 2:
return 0.3
elif sample == 3:
return 0.2
elif sample == 4:
return 0.15
elif sample == 5:
return 0.1
else:
return 0 # default value when CW is not 1, 2, 3, 4, or 5
# Apply the calculation and create a new dataframe
ky_new = {
'ky': df['SAMPLE_1'].apply(lambda x: calculate_value(x)),
}
ky = pd.DataFrame(ky_new)
Calculate Permanent ground displacement (PGD) of (earthquake induced) landsliding by using an analytical expression that relates the PGD with PGA: $\(PGD = \exp(a + b \cdot \ln(\text{PGA}) - c \cdot k_y + 0.535 \cdot d)\)$
def calculate_PGD(df, PGA_column, return_period):
"""
This function calculates Permanent Ground Displacement (PGD) for a given return period using
the specified Peak Ground Acceleration (PGA) data.
Parameters:
df (pd.DataFrame): The input DataFrame containing PGA values and other relevant data.
PGA_column (str): The name of the column in the DataFrame that contains the PGA values.
return_period (int): The return period in years, which determines the value of constant 'd' used in the calculation.
Returns:
pd.Series: A Series containing the calculated PGD values for the specified return period.
"""
# Constants for the calculation
a = -2.965
b = 2.127
c = 6.583
# this would be nicer defined per intervals, not just strict defined values
if return_period == 73 or return_period == 102:
d = 5
elif return_period == 475:
d = 6
else:
d = 7
# Calculate PGD
PGD = np.exp(a + b * np.log(df[PGA_column]) - c * ky["ky"] + 0.535 * d)
return PGD # Return as a Series instead of a DataFrame
# Define return periods and corresponding PGA columns
return_periods = [73, 102, 475, 1000, 2500, 5000]
PGA_columns = ['PGA-0.0137', 'PGA-0.0098', 'PGA-0.0021', 'PGA-0.001', 'PGA-0.0003', 'PGA-0.0001']
# Create a DataFrame to store all PGD results
PGD = pd.DataFrame()
# Loop through each return period and PGA column
for index, row in Scenaria.iterrows():
return_period = row['Return_Period']
PGA_column = f'PGA-{row["1/years"]}' # Construct the corresponding PGA column name
PGD_value = calculate_PGD(df, PGA_column, return_period)
PGD[f'PGD-{row["1/years"]}'] = PGD_value # Add the PGD values to the DataFrame
PGD.head(5)
| PGD-0.0137 | PGD-0.0098 | PGD-0.0021 | PGD-0.001 | PGD-0.0003 | PGD-0.0001 | |
|---|---|---|---|---|---|---|
| 0 | 0.000634 | 0.001019 | 0.009195 | 0.028004 | 0.053822 | 0.085483 |
| 1 | 0.001225 | 0.001969 | 0.017760 | 0.054091 | 0.103957 | 0.165111 |
| 2 | 0.002366 | 0.003751 | 0.033449 | 0.105478 | 0.212258 | 0.354169 |
| 3 | 0.001634 | 0.002580 | 0.021517 | 0.068424 | 0.147869 | 0.251780 |
| 4 | 0.001024 | 0.001586 | 0.012463 | 0.041079 | 0.091581 | 0.158642 |
Calculate the probability of the occurrence of the earthquake induced landslide
# Extract years_values from the Scenaria DataFrame
years_values = Scenaria['1/years'].tolist() # Use the annual probabilities as years values
# Create a list to store Planaslide DataFrames for each years_value
plandslide_df_list = []
# Function to assign Planaslide values based on PGA and Sample
def assign_plandslide_values(sample):
if sample == 1:
return 0.000
elif sample == 2:
return 0.010
elif sample == 3:
return 0.050
elif sample == 4:
return 0.100
elif sample == 5:
return 0.400
# Initialize a list to store the DataFrames for each year
plandslide_df_list = []
# Iterate over each years_value
for years_value in years_values:
# List to store Planaslide data for this specific years_value
plandslide_data = []
# Iterate over rows in the existing df DataFrame
for index, row in df.iterrows():
pga_value = row[f"PGA-{years_value}"] # Access the PGA value using dynamic column name
sample_value = row['SAMPLE_1'] # Get the Sample value
plandslide1 = assign_plandslide_values(sample_value) # Get Planaslide value
# Store results in a dictionary
plandslide_data.append({
f'Plandslide-{years_value}': f"{plandslide1:.3f}", # Unique column for each year
})
# Create a new DataFrame for this specific years_value
plandslide_df = pd.DataFrame(plandslide_data)
plandslide_df_list.append(plandslide_df)
# Concatenate all DataFrames into one common DataFrame along columns (axis=1)
Plandslide_df = pd.concat(plandslide_df_list, axis=1)
Plandslide_df.head()
| Plandslide-0.0137 | Plandslide-0.0098 | Plandslide-0.0021 | Plandslide-0.001 | Plandslide-0.0003 | Plandslide-0.0001 | |
|---|---|---|---|---|---|---|
| 0 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 |
| 1 | 0.010 | 0.010 | 0.010 | 0.010 | 0.010 | 0.010 |
| 2 | 0.050 | 0.050 | 0.050 | 0.050 | 0.050 | 0.050 |
| 3 | 0.010 | 0.010 | 0.010 | 0.010 | 0.010 | 0.010 |
| 4 | 0.010 | 0.010 | 0.010 | 0.010 | 0.010 | 0.010 |
def compute_landcdf_values(row, years_value):
"""
Computes CDF values based on the PGD values.
Parameters:
- row: A row from the input DataFrame containing PGD values.
- years_value: The years value to access the correct PGD column.
Returns:
- A tuple of computed CDF values (cdf1, cdf2, cdf3, cdf4).
"""
# Access the PGD value from the dynamic column
pga_column_name = f"PGD-{years_value}" # Create the column name based on years_value
pgd_value = row[pga_column_name] # Access the PGD value
# Fixed beta and scale values
beta = 1.2
scale_values = [0.254] * 5 # Four IM values, all set to 0.254
# Calculate log-normal distributions with fixed values
dist1 = calculate_log_normal_distribution(beta, scale_values[0])
dist2 = calculate_log_normal_distribution(beta, scale_values[1])
dist3 = calculate_log_normal_distribution(beta, scale_values[2])
dist4 = calculate_log_normal_distribution(beta, scale_values[3])
# Compute CDF values using PGD as the x_value
landcdf1 = dist1.cdf(pgd_value) if pd.notnull(pgd_value) else np.nan
landcdf2 = dist2.cdf(pgd_value) if pd.notnull(pgd_value) else np.nan
landcdf3 = dist3.cdf(pgd_value) if pd.notnull(pgd_value) else np.nan
landcdf4 = dist4.cdf(pgd_value) if pd.notnull(pgd_value) else np.nan
return landcdf1, landcdf2, landcdf3, landcdf4
# Create a DataFrame to store the final results
Landcdf = pd.DataFrame()
# Iterate over each return period in the Scenaria DataFrame
for index, row in Scenaria.iterrows():
years_value = row['1/years'] # Get the years value from the current row
cdf_results = []
# Compute CDF values for each row in the PGD DataFrame
for _, pgd_row in PGD.iterrows():
cdf_values = compute_landcdf_values(pgd_row, years_value)
cdf_results.append(cdf_values)
# Convert the results to a DataFrame
cdf_df = pd.DataFrame(cdf_results, columns=[f'LandDS1-{years_value}', f'LandDS2-{years_value}', f'LandDS3-{years_value}', f'LandDS4-{years_value}'])
# Concatenate the results to the main landcdf DataFrame
Landcdf = pd.concat([Landcdf, cdf_df], axis=1)
PL = 1.0#
Calculate the damage index (loss ratio) assuming that the probability of an earthquake-induced landslide is equal to the probability of the earthquake itself (i.e., \(P_L = 1.0\)), meaning the landslide will certainly occur given that the earthquake occurs.
# Convert relevant columns to numeric if they are not already
DI_seismic = DI_seismic.apply(pd.to_numeric, errors='coerce')
Landcdf = Landcdf.apply(pd.to_numeric, errors='coerce')
# Extract years_values from the Scenaria DataFrame
years_values = Scenaria['1/years'].tolist() # Use the annual probabilities as years values
# Initialize a new DataFrame for combined results
DI_combined_P1 = pd.DataFrame({
})
# List of dataset names (DS1, DS2, DS3, DS4.ds5)
datasets = [1,2,3,4]
# Loop through each years_value and perform the calculations for each dataset
for years_value in years_values:
for dataset in datasets:
combined_value = (
DI_seismic[f"CDF{dataset}-{years_value}"] +
1 * Landcdf[f"LandDS{dataset}-{years_value}"] -
DI_seismic[f"CDF{dataset}-{years_value}"] * 1 * Landcdf[f"LandDS{dataset}-{years_value}"]
)
# Store the results in the new DataFrame
DI_combined_P1[f"CombDS{dataset}-{years_value}"] = combined_value
# Loop through each years_value and perform the calculations for each dataset
for years_value in years_values:
# Calculate probabilities of occurrence for each damage state
DI_combined_P1[f"P_DS1-{years_value}"] = DI_combined_P1[f"CombDS1-{years_value}"] - DI_combined_P1[f"CombDS2-{years_value}"]
DI_combined_P1[f"P_DS2-{years_value}"] = DI_combined_P1[f"CombDS2-{years_value}"] - DI_combined_P1[f"CombDS3-{years_value}"]
DI_combined_P1[f"P_DS3-{years_value}"] = DI_combined_P1[f"CombDS3-{years_value}"] - DI_combined_P1[f"CombDS4-{years_value}"]
DI_combined_P1[f"P_DS4-{years_value}"] = DI_combined_P1[f"CombDS4-{years_value}"]
# Calculate damage index for the scenario based on Typology
for index in range(len(DI_combined_P1)):
if df['Material'].iloc[index] == 'Precast':
DI_combined_P1.at[index, f"DI-{years_value}"] = (
DI_combined_P1.at[index, f"P_DS1-{years_value}"] * 0.05 +
DI_combined_P1.at[index, f"P_DS2-{years_value}"] * 0.30 +
DI_combined_P1.at[index, f"P_DS3-{years_value}"] * 0.70 +
DI_combined_P1.at[index, f"P_DS4-{years_value}"] * 1
)
else:
DI_combined_P1.at[index, f"DI-{years_value}"] = (
DI_combined_P1.at[index, f"P_DS1-{years_value}"] * 0.05 +
DI_combined_P1.at[index, f"P_DS2-{years_value}"] * 0.15 +
DI_combined_P1.at[index, f"P_DS3-{years_value}"] * 0.60 +
DI_combined_P1.at[index, f"P_DS4-{years_value}"] * 1
)
Calculate annual collapse probability based on strest methodology
# Define return periods
Periods = [5,73,102,475,1000,2500,5000,10000]
lamda1=[0.9999,0.5,0.39,0.1,0.05,0.02,0.01,0.005]
# Compute λ values
lambda_values = [1 / (-50 / np.log(1 - l)) for l in lamda1]
# Compute P[IM_i] using the given formula
P_IM = []
for t in range(1, len(lambda_values) - 1): # Avoid first and last index
P_IM_i = (lambda_values[t - 1] - lambda_values[t + 1]) / 2
P_IM.append(P_IM_i)
# Create a dictionary mapping return periods to P[IM_i]
P_IM_dict = {data['1/years'][i]: P_IM[i] for i in range(len(P_IM))}
# Multiply each "CDF4-{r}" column by the corresponding P[IM_i]
for r in data['1/years']: # Exclude first and last periods as they lack P[IM_i]
column_name = f"CombDS4-{r}"
new_column_name = f"{column_name}*P[IM_i]"
if column_name in DI_combined_P1.columns:
DI_combined_P1[new_column_name] = DI_combined_P1[column_name] * P_IM_dict[r]
# Add a new column that sums all the new columns
new_columns = [f"CombDS4-{r}*P[IM_i]" for r in data['1/years'] if f"CombDS4-{r}" in DI_combined_P1.columns]
# Create a new column "Sum_of_new_columns" by summing across the new columns
DI_combined_P1["Annual Collapse Probability"] = DI_combined_P1[new_columns].sum(axis=1)
Create and save the final DataFrame containing the damage index (loss ratio) for different return periods along with the annual probability of collapse for each exposure point.
# Concatenate df and IM horizontally, aligning by their index
common_df_combined_P1 = pd.concat([df, IM, ky, PGD, Landcdf, DI_combined_P1], axis=1)
# Define the path first
seimsic_landslide_P1_file_path = savepath / f"Multi-Hazard/Earthquakes & Landslides/DI_Earthquakes_&_Landslides_P1_ESRM20_{asset}.csv"
# Ensure the directory exists
seimsic_landslide_P1_file_path.parent.mkdir(parents=True, exist_ok=True)
# Save the DataFrame to CSV
common_df_combined_P1.to_csv(seimsic_landslide_P1_file_path, index=False)
Visualise loss ratio map for a return period of 1000 years
common_df_combined_P1['geometry'] = common_df_combined_P1.apply(lambda row: Point(row['Longitude'], row['Latitude']), axis=1)
# Convert to GeoDataFrame
combined_P1_gdf = gpd.GeoDataFrame(common_df_combined_P1, geometry='geometry', crs="EPSG:4326")
# Create bins for the DI-0.001 column
bins = [0, 0.1, 0.25, 0.4, 1] # Define the bin ranges
labels = ['0-0.1', '0.1-0.25', '0.25-0.4', '0.4-1'] # Labels for the bins
# Assign bin values based on 'DI-0.001'
combined_P1_gdf['DI_binned'] = pd.cut(combined_P1_gdf['DI-0.001'], bins=bins, labels=labels, right=False)
# Interactive map with 4 bins and custom colors
combined_P1_gdf.explore(
column="DI_binned", # Use the binned DI-0.001 column for coloring
cmap='OrRd', # Optional colormap, we are using custom colors
stroke=True, # Add stroke (outline) around markers
line_color="black", # Set the stroke color to black
legend=True, # Show the legend
legend_kwds={
'caption': 'Loss ratio (T=1000 years)', # Legend caption
'orientation': 'horizontal' # Horizontal legend
},
marker_kwds={
'radius': 5, # Set marker size (adjust as needed)
'fill': True, # Fill the markers with color
},
tooltip="DI-0.001", # Show the DI-0.001 value in the tooltip
)
PL < 1.0#
Calculate the damage index (loss ratio) assuming that the probability of an earthquake-induced landslide is equal to the probability of the earthquake itself (i.e., \(P_L = 1.0\)), meaning the landslide will certainly occur given that the earthquake occurs.
# Convert relevant columns to numeric if they are not already
DI_seismic = DI_seismic.apply(pd.to_numeric, errors='coerce')
Landcdf = Landcdf.apply(pd.to_numeric, errors='coerce')
Plandslide_df = Plandslide_df.apply(pd.to_numeric, errors='coerce')
# Extract years_values from the Scenaria DataFrame
years_values = Scenaria['1/years'].tolist() # Use the annual probabilities as years values
# Initialize a new DataFrame for combined results
DI_combined = pd.DataFrame({
})
# List of dataset names (DS1, DS2, DS3, DS4)
datasets = [1,2,3,4]
# Loop through each years_value and perform the calculations for each dataset
for years_value in years_values:
for dataset in datasets:
combined_value = (
DI_seismic[f"CDF{dataset}-{years_value}"] +
Plandslide_df[f'Plandslide-{years_value}'] * Landcdf[f"LandDS{dataset}-{years_value}"] -
DI_seismic[f"CDF{dataset}-{years_value}"] * Plandslide_df[f'Plandslide-{years_value}'] * Landcdf[f"LandDS{dataset}-{years_value}"]
)
# Store the results in the new DataFrame
DI_combined[f"CombDS{dataset}-{years_value}"] = combined_value
# Loop through each years_value and perform the calculations for each dataset
for years_value in years_values:
# Calculate probabilities of occurrence for each damage state
DI_combined[f"P_DS1-{years_value}"] = DI_combined[f"CombDS1-{years_value}"] - DI_combined[f"CombDS2-{years_value}"]
DI_combined[f"P_DS2-{years_value}"] = DI_combined[f"CombDS2-{years_value}"] - DI_combined[f"CombDS3-{years_value}"]
DI_combined[f"P_DS3-{years_value}"] = DI_combined[f"CombDS3-{years_value}"] - DI_combined[f"CombDS4-{years_value}"]
DI_combined[f"P_DS4-{years_value}"] = DI_combined[f"CombDS4-{years_value}"]
# Calculate damage index for the scenario based on Material
# This assumes you want to check the Material for each row
for index in range(len(DI_combined)):
if df['Material'].iloc[index] == 'Precast':
DI_combined.at[index, f"DI-{years_value}"] = (
DI_combined.at[index, f"P_DS1-{years_value}"] * 0.05 +
DI_combined.at[index, f"P_DS2-{years_value}"] * 0.30 +
DI_combined.at[index, f"P_DS3-{years_value}"] * 0.70 +
DI_combined.at[index, f"P_DS4-{years_value}"] * 1
)
else:
DI_combined.at[index, f"DI-{years_value}"] = (
DI_combined.at[index, f"P_DS1-{years_value}"] * 0.05 +
DI_combined.at[index, f"P_DS2-{years_value}"] * 0.15 +
DI_combined.at[index, f"P_DS3-{years_value}"] * 0.60 +
DI_combined.at[index, f"P_DS4-{years_value}"] * 1
)
Calculate annual collapse probability based on strest methodology
# Define return periods
Periods = [5,73,102,475,1000,2500,5000,10000]
lamda1=[0.9999,0.5,0.39,0.1,0.05,0.02,0.01,0.005]
# Compute λ values
lambda_values = [1 / (-50 / np.log(1 - l)) for l in lamda1]
# Compute P[IM_i] using the given formula
P_IM = []
for t in range(1, len(lambda_values) - 1): # Avoid first and last index
P_IM_i = (lambda_values[t - 1] - lambda_values[t + 1]) / 2
P_IM.append(P_IM_i)
# Create a dictionary mapping return periods to P[IM_i]
P_IM_dict = {data['1/years'][i]: P_IM[i] for i in range(len(P_IM))}
# Multiply each "CDF4-{r}" column by the corresponding P[IM_i]
for r in data['1/years']: # Exclude first and last periods as they lack P[IM_i]
column_name = f"CombDS4-{r}"
new_column_name = f"{column_name}*P[IM_i]"
if column_name in DI_combined.columns:
DI_combined[new_column_name] = DI_combined[column_name] * P_IM_dict[r]
# Add a new column that sums all the new columns
new_columns = [f"CombDS4-{r}*P[IM_i]" for r in data['1/years'] if f"CombDS4-{r}" in DI_combined.columns]
# Create a new column "Sum_of_new_columns" by summing across the new columns
DI_combined["Annual Collapse Probability"] = DI_combined[new_columns].sum(axis=1)
Create and save the final DataFrame containing the damage index (loss ratio) for different return periods along with the annual probability of collapse for each exposure point.
# Concatenate df and IM horizontally, aligning by their index
common_df_combined = pd.concat([df, IM, ky, PGD, Landcdf, DI_combined], axis=1)
# Define the path first
seimsic_landslide_file_path = savepath / f"Multi-Hazard/Earthquakes & Landslides/DI_Earthquakes_&_Landslides_Plandslide_ESRM20_{asset}.csv"
# Ensure the directory exists
seimsic_landslide_file_path.parent.mkdir(parents=True, exist_ok=True)
# Save the DataFrame to CSV
common_df_combined.to_csv(seimsic_landslide_file_path, index=False)
Visualise loss ratio map for a return period of 1000 years
common_df_combined['geometry'] = common_df_combined.apply(lambda row: Point(row['Longitude'], row['Latitude']), axis=1)
# Convert to GeoDataFrame
combined_gdf = gpd.GeoDataFrame(common_df_combined, geometry='geometry', crs="EPSG:4326")
# Create bins for the DI-0.001 column
bins = [0, 0.1, 0.25, 0.4, 1] # Define the bin ranges
labels = ['0-0.1', '0.1-0.25', '0.25-0.4', '0.4-1'] # Labels for the bins
# Assign bin values based on 'DI-0.001'
combined_gdf['DI_binned'] = pd.cut(combined_gdf['DI-0.001'], bins=bins, labels=labels, right=False)
# Interactive map with 4 bins and custom colors
combined_gdf.explore(
column="DI_binned", # Use the binned DI-0.001 column for coloring
cmap='OrRd', # Optional colormap, we are using custom colors
stroke=True, # Add stroke (outline) around markers
line_color="black", # Set the stroke color to black
legend=True, # Show the legend
legend_kwds={
'caption': 'Loss ratio (T=1000 years)', # Legend caption
'orientation': 'horizontal' # Horizontal legend
},
marker_kwds={
'radius': 5, # Set marker size (adjust as needed)
'fill': True, # Fill the markers with color
},
tooltip="DI-0.001", # Show the DI-0.001 value in the tooltip
)